package com.zimmor.mq.rocket_train.base;

import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * @Author:zimmor
 * @Date: 2023/2/14 17:36
 * @Description pull的方式消费消息Subscribe模式
 * @version: 1.0.0
 */

public class LiteSubConsumer {
    public static void main(String[] args) throws MQClientException {
        //创建消费者
        DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("consumer-group6");
        //设置namesrv
        consumer.setNamesrvAddr("192.168.204.136:9876;192.168.204.137:9876");
        //订阅主题
        consumer.subscribe("base1", "tag1");
        //设置最大拉去消息数目
        consumer.setPullBatchSize(10);
        //启动consumer
        consumer.start();
        boolean running = true;
        //监听消息
        try{
            while (running) {
                List<MessageExt>  pollResult = consumer.poll();
                for (MessageExt messageExt : pollResult) {
                    System.out.println("msg = " + new String(messageExt.getBody()));
                }
                System.out.println("==================================");
            }
        }finally {
            consumer.shutdown();
        }
        
    }
}
